Task : Multi-Task management

更新时间:
2024-05-14

Task : Multi-Task management

Task module provides multi-tasking environment support for JSRE.

Each Task is an independent thread in the operating system. The operating system can be independently scheduled according to the scheduling policy. The multitasking design can improve the application parallelism.

Why Multi-Task

Although the runtime platform such as Node.js is parallelized by a multi-threaded asynchronous proxy, the core program process can't be parallelized, and the programmer's controllability is too low and there is no controllable policy. JSRE is different. JSRE hopes that programmers have the opportunity to control the parallel design and control the real-time task scheduling policy.

Multi-tasking design also provides better application code decoupling, application module separation and development, and easier to build more complex large-scale applications.

Easy to develop

Multitasking also causes some development difficulties, including shared resource safety, synchronization and mutual exclusion. In order to reduce the development difficulty, JSRE allocates language environment context for each task, and the functions and objects in the context are not shared between multitasking. This design can greatly reduce the development difficulty.

In order to solve the efficiency reduce caused by data non-sharing, JSRE provides a feature-rich and efficient inter-task communication mechanism, including shared memory (Shared.ArrayBuffer), subscription and publish asynchronous communication (SigSlot), multi-task shared K/V map (SyncTable), local procedure call (LPC) and basic communication interfaces (task.send). These interfaces not only can transmit strings but also can transmit object.

The multitasking environment provided by JSRE is not only efficient, but also highly decoupled and easy to develop.

Example

  • main.js
#!/bin/javascript

var task = new Task('./task.js');
var msg  = { a: 3, b: 5 };

console.log('new task id:', task.id());

while (true) {
  task.send(msg);
  sys.sleep(1000);
}
  • task.js
console.log('new task run!, id:', Task.me());

Task.async(false);

while (true) {
  var from = {};
  var msg  = Task.recv(from);
  if (msg) {
    console.log('recv a message:', msg, 'from:', from);
  }
}

Or:

console.log('new task run!, id:', Task.me());

var iosched = require('iosched');

Task.on('message', function(msg, from) {
  console.log('recv a message:', msg, 'from:', from);
});

iosched.forever();

Can be run as follows:

$ ./main.js
[JSRE-USR]new task id: 67174472
[JSRE-USR]new task run!, id: 67174472
[JSRE-USR]recv a message: {a:3,b:5} from: {id:67174470}
[JSRE-USR]recv a message: {a:3,b:5} from: {id:67174470}
[JSRE-USR]recv a message: {a:3,b:5} from: {id:67174470}
...

Support

The following shows Task module APIs available for each permissions.

 User ModePrivilege Mode
Task
Task.me
Task.exit
Task.parent
Task.create
Task.count
Task.list
Task.yield
Task.testCancel
Task.priority
Task.send
Task.recv
Task.flush
Task.isMain
Task.getCurrentSize
Task.getBufferSize
Task.setBufferSize
Task.nextTick
Task.cleanup
Task.fd
Task.async
Task.tls
Task.wtls
Task.features
task.id
task.isAlive
task.cancel
task.priority
task.detach
task.send

Task Class

new Task(jsFile[, arg[, opt]])

  • jsFile {String} Specify the file to be run by the new task.
  • arg {String} | {Object} New task argument. default: ''.
  • opt {Object} Create new task option.
    • directory {String} New task file prefix directory.
  • Returns: {task} A new task object.

Create a new task, new task can get argument by ARGUMENT variable. Once a task is created, it will be executed immediately.

Example

var task = new Task('./task.js', 'test_arg_string');

Task.me()

  • Returns: {Integer} Current task ID.

Get current task ID.

Example

var id = Task.me();

Task.exit([retValue])

  • retValue {Integer} Current return value. default: 0.

Current task exit. It is recommended to call this function in the outermost layer, and try not to call this function in event callbacks. This function will request the JS engine to exit the task, and the JS engine will exit the current task when it reaches the outermost layer.

Example

var id = Task.me();
console.log('Current task id', id);
Task.exit(); // Exit

Task.parent()

  • Returns: {Integer} Current task parent ID.

Get current task parent ID. Returns 0 if current task is main task.

Example

var parent = Task.parent();

Task.create(jsFile[, arg[, opt]])

  • jsFile {String} Specify the file to be run by the new task.
  • arg {String} | {Object} New task argument. default: ''.
  • opt {Object} Create new task option.
    • directory {String} New task file prefix directory.
  • Returns: {Object} New task object.

Same as new Task(jsFile[, arg[, opt]]).

Task.count()

  • Returns: {Integer} Number of tasks.

Get the number of tasks in current process.

Example

var cnt = Task.count();

Task.list([exceptMe])

  • exceptMe {Boolean} Except current task. default: false.
  • Returns: {Array} Task ID array.

Get All task ID list in current process.

Example

// Brocast message
var tasks = Task.list(true);
tasks.forEach(function(id) {
  Task.send(id, { msg: 'brocast message!' });
});

Task.yield()

Relax CPU. If there is no task ready with same priority, the current task will continue execute.

Task.testCancel()

  • Returns: {Boolean} Is there any task requesting to delete the current task?.

For safety reasons, tasks cannot be forcibly deleted, and JSRE task deletion uses a request and response mechanism.

Example

while (true) {
  // Do something...
  if (Task.testCancel()) {
    // Do some delete prepare work...
    Task.exit(); // same as return;
  }
}

Tasks can use Task.testCancel() to test if there are other tasks requesting to delete the current task. If there is, then prepare for exit, and then call return at the outermost layer to complete the task deletion or call Task.exit() function.

Task.priority(prio)

  • prio {String} New priority.

prio Can only be:

  • 'low'
  • 'normal'
  • 'high'
  • 'realtime'

Change current task scheduling priority.

JSRE runs on the SylixOS kernel. SylixOS is a preemptive real-time operating system, if high-priority tasks are ready, it can preempt low-priority tasks immediately, Such this scheduling policy can improve the system's real-time response capability. Applications should set critical tasks to high priority and normal tasks to low priority.

Only privileged mode applications can set 'realtime' priority.

Example

Task.priority('low');

Task.send(id, msg)

  • id {Integer} Target task ID.
  • msg {String} | {Object} Message to be send.
  • Returns: {Boolean} Returns true if the transmission succeeds, otherwise returns false.

In JSRE environment, each task has a message queue. This message queue can be used to receive messages sent by other tasks. These messages are bordered and do not merge multiple messages into one. Send and receive only one message at one time.

The message can be a string or an object. If it is an object, the system discards all methods and passes the object to other tasks.

Example

Task.send(otherTaskId, 'This is a test message!');

Task.send(id, buffer[, offset[, length[, addition]]])

  • id {Integer} Target task ID.
  • buffer {Buffer} The data that needs to be send.
  • offset {Integer} Buffer offset. default:0.
  • length {Integer} Read length. default:buffer.length.
  • addition {Object} Buffer additional information. default: no information.
  • Returns: {Boolean} Returns true if the transmission succeeds, otherwise returns false.

Send binary data to the target task. If it is a large amount of binary data transfer, such as video, it is recommended to use shared memory.

Example

var buffer = fs.readFile('./test');

Task.send(otherTaskId, buffer);

The addition object is an additional information, and the receiver can use this information to determine the purpose of the buffer.

Example

var buffer = fs.readFile('./test');

Task.send(otherTaskId, buffer, undefined, undefined, { foo: 'bar' });

Task.recv([from[, timeout]])

  • from {Object} The system informs where the message came from, and the id attribute is specified to indicate the id of the sending task. default:undefined.
  • timeout {Integer} Receive timeout in milliseconds. default: undefined means wait forever.
  • Returns: {String} | {Object} | {Buffer} Received message.

The task can receive a string or an object message. If the sender sends a string, it receives a string type message. If the sender sends an object type message, it receives the object type message. Task defaults use asynchronous mode, users must first call Task.async(false) to convert to synchronous mode before using this method.

Example

Task.async(false); // convert to synchronous mode
while (true) {
  var from = {};
  var msg = Task.recv(from);
  if (msg) {
    console.log('Receive a message from:', from);
    switch(msg.id) {
      case 0:
      // Do something.
      break;

    case 1:
      // Do something.
      break;

    default:
      break;
    }
  }
}

Task.flush()

Clear the messages in the current task message queue. Unreceived messages will be discarded.

Example

// All message discarded.
Task.flush();

Task.isMain()

  • Returns: {Boolean} Whether the current task is the main task of this process.

Get the current task whether it is the main task of this process.

Example

if (Task.isMain()) {
  // This task is main task.
}

Task.getCurrentSize(id)

  • id {Integer} Task ID.
  • Returns: {Integer} Total message size in target task.

JSRE allows user set the size of task message queue buffer. When all unreceived message total size reaches this setting, the new message will be discarded. Task.getCurrentSize(id) gets the size of all message in the message queue of the specified task.

Example

var curMsgSize = Task.getCurrentSize(Task.me());

Task.getBufferSize(id)

  • id {Integer} Task ID.
  • Returns: {Integer} Message queue buffer size of the specified task.

Get the specified task message queue size.

Example

var curQueueSize = Task.getBufferSize(Task.me());

Task.setBufferSize(id, size)

  • id {Integer} Task ID.
  • size {Integer} Task message queue size.

Set the specified task message queue size.

Example

// Set message queue size to 16KBytes.
Task.setBufferSize(Task.me(), 16 * 1024);

Task.nextTick(callback[, ...args]);

  • callback {Function} Callback function.
  • ...args {Any} Any number of any type callback parameters.

Task.nextTick() is the same as Node.js process.nextTick(). This function adds a callback to the system pending queue. This function will be called when the system allows interrupts.

JSRE supports the interrupt mechanism and supports setting the interrupt response mode to control when the interrupt is responded, the setting function is interrupt.level(option) valid options include:

DefinitionDescription
interrupt.ANYTIMEResponds interrupt at any time. Only for Privileged Mode applications.
interrupt.ONPENDResponds interrupt when program is blocked, such as blocking at network receive and send function call.
interrupt.ONRPENDResponds interrupt only when system has a read/receive blocking. This is default setting.
interrupt.ONPOLLResponds interrupt only when sys.sleep() or iosched.poll() / forever() function blocked.

When interrupt respond is enabled, JSRE first checks if the pending queue has any pending callback, and if exist, the callback will executes immediately.

Example

function callback() {
  console.log('callback run!');
}

Task.nextTick(callback);

Task.cleanup(callback[, ...args]);

  • callback {Function} Callback function.
  • ...args {Any} Any number of any type callback parameters.

Add a callback function, which will be executed when the task exits normally. The execution order is the opposite direction of the installation order.

Task.fd()

  • Returns: {Integer} Task message event file descriptor.

Get current task message event file descriptor. Only for iosched readable event detection in current tasks.

Example

var ioevent = iosched.event(iosched.READ, Task.fd(),
() => {
  var from = {};
  var msg = Task.recv(from);
  if (msg) {
    console.log('receive:', msg, 'from:', from);
  }
  return true;
});

iosched.add(ioevent);

while (true) {
  iosched.poll();
}

Task.async([enable])

  • enable {Boolean} Whether to enable asynchronous mode. default: true.

Receiving task messages using asynchronous mode. After enabling this mode, the Task.recv() method will be deleted.

Example

Task.async(); // asynchronous mode
Task.async(false); // synchronous mode

Task Local Storage

Each task has its own local storage manager, and multiple tasks cannot access each other.

Task.tls

  • {Map}

Task.tls is a Map type object, which is valid for the current entire task. The current task can be stored, read, traversed, and deleted anywhere. It should be noted that: Map type objects refer to the stored objects. If you do not manually delete them, a memory leak will occur.

Example

Task.tls.set(key, value);
Task.tls.get(key);
Task.tls.delete(key);

Task.wtls

  • {WeakMap}

Task.wtls is a WeakMap type object, which is valid for the current entire task. The current task can be stored, read and deleted anywhere. WeakMap makes weak references to key. It only supports get(), set(), has() and delete() operations. It does not support traversal iteration.

Example

Task.wtls.set(key, value);
Task.wtls.get(key);
Task.wtls.delete(key);

Task.features

  • {Map}

Task.features is a Map type object, This map saves some working feature parameters of the current JS context, which can be adjusted to control the operation of JSRE.

Task Object

task.id()

  • Returns: {Integer} Task ID.

Get the task ID created before, If the return 0, the task ID is invalid.

Example

var task = new Task('./task.js');

console.log('new task ID:', task.id());

task.isAlive()

  • Returns: {Boolean} Is this task alive.

Get whether the specified task is alive. If this task is detached, you cannot get the alive status of this task.

Example

var task = new Task('./task.js');

console.log('task id:', task.id(), 'alive:', task.isAlive());

task.cancel([callback])

  • callback {Function} Execute this callback after the task exits. default: no callback.
    • id {Integer} Task ID which has been canceled.
    • retVal {Integer} Task returned value.
  • Returns: {Integer} 0: Request success, 1: The task no longer exists.

This function requests to delete the target task. The target task can detect the delete request by using Task.testCancel().

Example

var task = new Task('./task.js');

task.cancel((id, retVal) => {
  console.log('Task canceled, return value:', retVal);
});

task.priority(prio)

  • prio {String} New priority.

prio Can only be:

  • 'low'
  • 'normal'
  • 'high'
  • 'realtime'

Set target task priority. Only privileged mode applications can set 'realtime' priority. For details, please refer to Task.priority().

Example

var task = new Task('./task.js');

task.priority('low');

task.detach()

Detach child task, the system reclaims resources after the child task exits, and cannot obtain the task return value.

Cannot operate this task object after task.detach().

task.send(msg)

  • msg {String} | {Object} Message to be send.
  • Returns: {Boolean} Returns true if the transmission succeeds, otherwise returns false.

Send a message to task. For details, please refer to Task.send().

Example

var task = new Task('./task.js');

task.send('test message');

task.send(buffer[, offset[, length[, addition]]])

  • buffer {Buffer} The data that needs to be send.
  • offset {Integer} Buffer offset. default:0.
  • length {Integer} Read length. default:buffer.length.
  • addition {Object} Buffer additional information. default: no information.
  • Returns: {Boolean} Returns true if the transmission succeeds, otherwise returns false.

Send binary data to task. If it is a large amount of binary data transfer, such as video, it is recommended to use shared memory.

Example

var buffer = fs.readFile('./test');

task.send(buffer);

The addition object is an additional information, and the receiver can use this information to determine the purpose of the buffer.

Example

var buffer = fs.readFile('./test');

task.send(buffer, undefined, undefined, { foo: 'bar' });

Task Events

The Task class inherits from the EventEmitter class. The following events are thrown in some specific situations.

message

When current task receives a message system will emit this event.

Example

Task.on('message', function(msg, from) {
  // ...
});

Example

Task.on('message', function(msg, from) {
  if (Buffer.isBuffer(msg)) {
    console.log(msg.addition);
  }
});

uncaughtException

When the deep callback function throws an exception, the current task can get the exception information by this event.

If this event is not subscribed, the system will print the exception information and the call stack, then use 1 as the exit code (exit(1)) and exit.

Example

Task.on('uncaughtException', (error) => {
  console.log('callback except!', error);
  console.trace(error.stack);
});

unhandledRejection

When the javascript engine encounters a promise reject that has unhandled, this event will be generated. The unhandledRejection event will not cause the process to exit now, but JSRE may terminate the process in the future, when the user does not caught this event.

Example

Task.on('unhandledRejection', (reason, promise) => {
  console.log('Unhandled Promise rejection:', reason);
  if (reason instanceof Error) {
    console.trace(reason.stack);
  }
});
文档内容是否对您有所帮助?
有帮助
没帮助